use crate::io::driver::platform;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf, Registration};

use mio::event::Evented;
use std::fmt;
use std::io::{self, Read, Write};
use std::marker::Unpin;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::task::{Context, Poll};

cfg_io_driver! {
    /// Associates an I/O resource that implements the [`std::io::Read`] and/or
    /// [`std::io::Write`] traits with the reactor that drives it.
    ///
    /// `PollEvented` uses [`Registration`] internally to take a type that
    /// implements [`mio::Evented`] as well as [`std::io::Read`] and or
    /// [`std::io::Write`] and associate it with a reactor that will drive it.
    ///
    /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
    /// used from within the future's execution model. As such, the
    /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
    /// implementations using the underlying I/O resource as well as readiness
    /// events provided by the reactor.
    ///
    /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
    /// `Sync`), the caller must ensure that there are at most two tasks that
    /// use a `PollEvented` instance concurrently. One for reading and one for
    /// writing. While violating this requirement is "safe" from a Rust memory
    /// model point of view, it will result in unexpected behavior in the form
    /// of lost notifications and tasks hanging.
    ///
    /// ## Readiness events
    ///
    /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
    /// this type also supports access to the underlying readiness event stream.
    /// While similar in function to what [`Registration`] provides, the
    /// semantics are a bit different.
    ///
    /// Two functions are provided to access the readiness events:
    /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
    /// current readiness state of the `PollEvented` instance. If
    /// [`poll_read_ready`] indicates read readiness, immediately calling
    /// [`poll_read_ready`] again will also indicate read readiness.
    ///
    /// When the operation is attempted and is unable to succeed due to the I/O
    /// resource not being ready, the caller must call [`clear_read_ready`] or
    /// [`clear_write_ready`]. This clears the readiness state until a new
    /// readiness event is received.
    ///
    /// This allows the caller to implement additional functions. For example,
    /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
    /// [`clear_read_ready`].
    ///
    /// ```rust
    /// use tokio::io::PollEvented;
    ///
    /// use futures::ready;
    /// use mio::Ready;
    /// use mio::net::{TcpStream, TcpListener};
    /// use std::io;
    /// use std::task::{Context, Poll};
    ///
    /// struct MyListener {
    ///     poll_evented: PollEvented<TcpListener>,
    /// }
    ///
    /// impl MyListener {
    ///     pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> {
    ///         let ready = Ready::readable();
    ///
    ///         ready!(self.poll_evented.poll_read_ready(cx, ready))?;
    ///
    ///         match self.poll_evented.get_ref().accept() {
    ///             Ok((socket, _)) => Poll::Ready(Ok(socket)),
    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
    ///                 self.poll_evented.clear_read_ready(cx, ready)?;
    ///                 Poll::Pending
    ///             }
    ///             Err(e) => Poll::Ready(Err(e)),
    ///         }
    ///     }
    /// }
    /// ```
    ///
    /// ## Platform-specific events
    ///
    /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
    /// These events are included as part of the read readiness event stream. The
    /// write readiness event stream is only for `Ready::writable()` events.
    ///
    /// [`std::io::Read`]: trait@std::io::Read
    /// [`std::io::Write`]: trait@std::io::Write
    /// [`AsyncRead`]: trait@AsyncRead
    /// [`AsyncWrite`]: trait@AsyncWrite
    /// [`mio::Evented`]: trait@mio::Evented
    /// [`Registration`]: struct@Registration
    /// [`TcpListener`]: struct@crate::net::TcpListener
    /// [`clear_read_ready`]: method@Self::clear_read_ready
    /// [`clear_write_ready`]: method@Self::clear_write_ready
    /// [`poll_read_ready`]: method@Self::poll_read_ready
    /// [`poll_write_ready`]: method@Self::poll_write_ready
    pub struct PollEvented<E: Evented> {
        io: Option<E>,
        inner: Inner,
    }
}

struct Inner {
    registration: Registration,

    /// Currently visible read readiness
    read_readiness: AtomicUsize,

    /// Currently visible write readiness
    write_readiness: AtomicUsize,
}

// ===== impl PollEvented =====

macro_rules! poll_ready {
    ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
        // Load cached & encoded readiness.
        let mut cached = $me.inner.$cache.load(Relaxed);
        let mask = $mask | platform::hup() | platform::error();

        // See if the current readiness matches any bits.
        let mut ret = mio::Ready::from_usize(cached) & $mask;

        if ret.is_empty() {
            // Readiness does not match, consume the registration's readiness
            // stream. This happens in a loop to ensure that the stream gets
            // drained.
            loop {
                let ready = match $poll? {
                    Poll::Ready(v) => v,
                    Poll::Pending => return Poll::Pending,
                };
                cached |= ready.as_usize();

                // Update the cache store
                $me.inner.$cache.store(cached, Relaxed);

                ret |= ready & mask;

                if !ret.is_empty() {
                    return Poll::Ready(Ok(ret));
                }
            }
        } else {
            // Check what's new with the registration stream. This will not
            // request to be notified
            if let Some(ready) = $me.inner.registration.$take()? {
                cached |= ready.as_usize();
                $me.inner.$cache.store(cached, Relaxed);
            }

            Poll::Ready(Ok(mio::Ready::from_usize(cached)))
        }
    }};
}

impl<E> PollEvented<E>
where
    E: Evented,
{
    /// Creates a new `PollEvented` associated with the default reactor.
    ///
    /// # Panics
    ///
    /// This function panics if thread-local runtime is not set.
    ///
    /// The runtime is usually set implicitly when this function is called
    /// from a future driven by a tokio runtime, otherwise runtime can be set
    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
    pub fn new(io: E) -> io::Result<Self> {
        PollEvented::new_with_ready(io, mio::Ready::all())
    }

    /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Ready`
    /// state. `new_with_ready` should be used over `new` when you need control over the readiness
    /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error`
    /// so if you are interested in those states, you will need to add them to the readiness state
    /// passed to this function.
    ///
    /// An example to listen to read only
    ///
    /// ```rust
    /// ##[cfg(unix)]
    ///     mio::Ready::from_usize(
    ///         mio::Ready::readable().as_usize()
    ///         | mio::unix::UnixReady::error().as_usize()
    ///         | mio::unix::UnixReady::hup().as_usize()
    ///     );
    /// ```
    ///
    /// # Panics
    ///
    /// This function panics if thread-local runtime is not set.
    ///
    /// The runtime is usually set implicitly when this function is called
    /// from a future driven by a tokio runtime, otherwise runtime can be set
    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
    pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
        let registration = Registration::new_with_ready(&io, ready)?;
        Ok(Self {
            io: Some(io),
            inner: Inner {
                registration,
                read_readiness: AtomicUsize::new(0),
                write_readiness: AtomicUsize::new(0),
            },
        })
    }

    /// Returns a shared reference to the underlying I/O object this readiness
    /// stream is wrapping.
    pub fn get_ref(&self) -> &E {
        self.io.as_ref().unwrap()
    }

    /// Returns a mutable reference to the underlying I/O object this readiness
    /// stream is wrapping.
    pub fn get_mut(&mut self) -> &mut E {
        self.io.as_mut().unwrap()
    }

    /// Consumes self, returning the inner I/O object
    ///
    /// This function will deregister the I/O resource from the reactor before
    /// returning. If the deregistration operation fails, an error is returned.
    ///
    /// Note that deregistering does not guarantee that the I/O resource can be
    /// registered with a different reactor. Some I/O resource types can only be
    /// associated with a single reactor instance for their lifetime.
    pub fn into_inner(mut self) -> io::Result<E> {
        let io = self.io.take().unwrap();
        self.inner.registration.deregister(&io)?;
        Ok(io)
    }

    /// Checks the I/O resource's read readiness state.
    ///
    /// The mask argument allows specifying what readiness to notify on. This
    /// can be any value, including platform specific readiness, **except**
    /// `writable`. HUP is always implicitly included on platforms that support
    /// it.
    ///
    /// If the resource is not ready for a read then `Poll::Pending` is returned
    /// and the current task is notified once a new event is received.
    ///
    /// The I/O resource will remain in a read-ready state until readiness is
    /// cleared by calling [`clear_read_ready`].
    ///
    /// [`clear_read_ready`]: method@Self::clear_read_ready
    ///
    /// # Panics
    ///
    /// This function panics if:
    ///
    /// * `ready` includes writable.
    /// * called from outside of a task context.
    ///
    /// # Warning
    ///
    /// This method may not be called concurrently. It takes `&self` to allow
    /// calling it concurrently with `poll_write_ready`.
    pub fn poll_read_ready(
        &self,
        cx: &mut Context<'_>,
        mask: mio::Ready,
    ) -> Poll<io::Result<mio::Ready>> {
        assert!(!mask.is_writable(), "cannot poll for write readiness");
        poll_ready!(
            self,
            mask,
            read_readiness,
            take_read_ready,
            self.inner.registration.poll_read_ready(cx)
        )
    }

    /// Clears the I/O resource's read readiness state and registers the current
    /// task to be notified once a read readiness event is received.
    ///
    /// After calling this function, `poll_read_ready` will return
    /// `Poll::Pending` until a new read readiness event has been received.
    ///
    /// The `mask` argument specifies the readiness bits to clear. This may not
    /// include `writable` or `hup`.
    ///
    /// # Panics
    ///
    /// This function panics if:
    ///
    /// * `ready` includes writable or HUP
    /// * called from outside of a task context.
    pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> {
        // Cannot clear write readiness
        assert!(!ready.is_writable(), "cannot clear write readiness");
        assert!(!platform::is_hup(ready), "cannot clear HUP readiness");

        self.inner
            .read_readiness
            .fetch_and(!ready.as_usize(), Relaxed);

        if self.poll_read_ready(cx, ready)?.is_ready() {
            // Notify the current task
            cx.waker().wake_by_ref();
        }

        Ok(())
    }

    /// Checks the I/O resource's write readiness state.
    ///
    /// This always checks for writable readiness and also checks for HUP
    /// readiness on platforms that support it.
    ///
    /// If the resource is not ready for a write then `Poll::Pending` is
    /// returned and the current task is notified once a new event is received.
    ///
    /// The I/O resource will remain in a write-ready state until readiness is
    /// cleared by calling [`clear_write_ready`].
    ///
    /// [`clear_write_ready`]: method@Self::clear_write_ready
    ///
    /// # Panics
    ///
    /// This function panics if:
    ///
    /// * `ready` contains bits besides `writable` and `hup`.
    /// * called from outside of a task context.
    ///
    /// # Warning
    ///
    /// This method may not be called concurrently. It takes `&self` to allow
    /// calling it concurrently with `poll_read_ready`.
    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
        poll_ready!(
            self,
            mio::Ready::writable(),
            write_readiness,
            take_write_ready,
            self.inner.registration.poll_write_ready(cx)
        )
    }

    /// Resets the I/O resource's write readiness state and registers the current
    /// task to be notified once a write readiness event is received.
    ///
    /// This only clears writable readiness. HUP (on platforms that support HUP)
    /// cannot be cleared as it is a final state.
    ///
    /// After calling this function, `poll_write_ready(Ready::writable())` will
    /// return `NotReady` until a new write readiness event has been received.
    ///
    /// # Panics
    ///
    /// This function will panic if called from outside of a task context.
    pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> {
        let ready = mio::Ready::writable();

        self.inner
            .write_readiness
            .fetch_and(!ready.as_usize(), Relaxed);

        if self.poll_write_ready(cx)?.is_ready() {
            // Notify the current task
            cx.waker().wake_by_ref();
        }

        Ok(())
    }
}

// ===== Read / Write impls =====

impl<E> AsyncRead for PollEvented<E>
where
    E: Evented + Read + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        ready!(self.poll_read_ready(cx, mio::Ready::readable()))?;

        // We can't assume the `Read` won't look at the read buffer,
        // so we have to force initialization here.
        let r = (*self).get_mut().read(buf.initialize_unfilled());

        if is_wouldblock(&r) {
            self.clear_read_ready(cx, mio::Ready::readable())?;
            return Poll::Pending;
        }

        Poll::Ready(r.map(|n| {
            buf.add_filled(n);
        }))
    }
}

impl<E> AsyncWrite for PollEvented<E>
where
    E: Evented + Write + Unpin,
{
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        ready!(self.poll_write_ready(cx))?;

        let r = (*self).get_mut().write(buf);

        if is_wouldblock(&r) {
            self.clear_write_ready(cx)?;
            return Poll::Pending;
        }

        Poll::Ready(r)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        ready!(self.poll_write_ready(cx))?;

        let r = (*self).get_mut().flush();

        if is_wouldblock(&r) {
            self.clear_write_ready(cx)?;
            return Poll::Pending;
        }

        Poll::Ready(r)
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
    match *r {
        Ok(_) => false,
        Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
    }
}

impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("PollEvented").field("io", &self.io).finish()
    }
}

impl<E: Evented> Drop for PollEvented<E> {
    fn drop(&mut self) {
        if let Some(io) = self.io.take() {
            // Ignore errors
            let _ = self.inner.registration.deregister(&io);
        }
    }
}
